HiSEN

WebSocket - Java & html & JavaScript - 单发 & 群发

一、背景说明

最近在做app后台相关接口

自建通知中心目前不能很好的支持给APP推送消息

长连接可以保持推送速度,目前app中内嵌了H5,所以考虑使用websocket

之前没有接触过websocket,百度了一堆之后,页面上可以正常使用

但是没有发现可用使用Java后台进行消息的发送,于是乎就琢磨了一上午,解决了这个问题

现在把这个小工程分享给大家,少走点弯路==

ps:很多不能在后台发送消息,是因为缺少java的客户端

二、准备工作

建立一个maven web 工程

添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.neovisionaries</groupId>
<artifactId>nv-websocket-client</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>

三、主要代码

websocket服务端主逻辑

为了实现简单的非群发操作,在连接websocket的时候,加上了一些get参数

例如:ws://localhost:8080/websocket?sendTo=hisen&method=methodSingle&user=hisenyuan

然后在后端判断,根据参数做出不同的动作

demo完整工程:https://github.com/hisenyuan/IDEAPractice/tree/master/websocket-demo

配置完Tomcat,即可使用,在java后台运行测试类(com.hisen.ws.client.ClientApp4Java)可发送消息到页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package com.hisen.ws.server;

import com.hisen.ws.util.Constants;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
* @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
* 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
*/
@ServerEndpoint("/websocket")
public class WebSocketServer {
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;

// 实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
// 当前用户
private String user;

/**
* 客户端可以是web页面,也可以是Java后台
* <p>
* 通过连接或者message可以控制发送给谁
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
// 获取url传过来的参数
Map<String, List<String>> parameterMap = session.getRequestParameterMap();
// 发送方式
String method = null;
// 发送给哪些人
List<String> receivers = new ArrayList<>();
// 发送者
String sernder = null;
if (parameterMap.containsKey(Constants.METHOD)) {
method = parameterMap.get(Constants.METHOD).get(0);
}
if (parameterMap.containsKey(Constants.SEND_TO)) {
receivers = parameterMap.get(Constants.SEND_TO);
}
if (parameterMap.containsKey(Constants.USER)) {
sernder = parameterMap.get(Constants.USER).get(0);
}

System.out.println("sender:" + sernder + ",receivers:" + receivers.toString() + ",method:" + method);
if (method == null || method.equals(Constants.METHOD_ALL)) {
//发送所有
send2All(message);
} else {
//单发
send2Users(receivers, message);
}

}

/**
* 连接建立成功调用的方法
*
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.user = session.getRequestParameterMap().get(Constants.USER).get(0);
// 放入map
webSocketMap.put(user, this);
//在线数加1
addOnlineCount();
System.out.println("有新连接加入!当前在线人数为" + getOnlineCount() + ",session:" + session.getId() + ",user:" + this.user);
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
// 移除
webSocketMap.remove(this.user);
//在线数减1
subOnlineCount();
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount() + ",user:" + this.user);
}


private void send2Users(List<String> receivers, String message) {
receivers.forEach(e -> {
System.out.println("single receiver:" + e);
Optional.ofNullable(webSocketMap.get(e))
.filter(webSocketServer -> webSocketServer.session.isOpen())
.ifPresent(webSocketServer -> sendOnce(message, e, webSocketServer));
});
}

private void send2All(String message) {
webSocketMap.forEach((key, value) -> {
sendOnce(message, key, value);
});
}

private void sendOnce(String message, String e, WebSocketServer webSocketServer) {
try {
webSocketServer.sendMessage(message);
} catch (IOException exp) {
System.out.println("发送出错,receiver:" + e);
}
}

/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误,user:" + this.user);
error.printStackTrace();
}

/**
* 自定义的方法
*
* @param message
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}

public static synchronized int getOnlineCount() {
return onlineCount;
}

public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}

public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}